Skip to content

KAFKA-19893: Reduce tiered storage redundancy with delayed upload (KIP-1241) #20913

Merged
kamalcph merged 154 commits into
apache:trunkfrom
jiafu1115:storage
May 22, 2026
Merged

KAFKA-19893: Reduce tiered storage redundancy with delayed upload (KIP-1241) #20913
kamalcph merged 154 commits into
apache:trunkfrom
jiafu1115:storage

Conversation

@jiafu1115
Copy link
Copy Markdown
Contributor

@jiafu1115 jiafu1115 commented Nov 18, 2025

JIRA:19893
KIP:1241

Currently, Kafka uploads all non-active local log segments to remote
storage even when they are still within the local retention period,
resulting in redundant storage of the same data in both tiers.

image

This wastes storage capacity (cost) without providing immediate
benefits,since reads during the retention window prioritize local data.

However, some users/topics do real-time analytics based on remote
storage directly and need the latest data to be available as soon as
possible (In fact, it only tries to stay as up-to-date as possible, but
it still can’t include the latest data because the active segment
hasn’t been uploaded yet.). Therefore, this optimization is offered as
a topic's optional configuration rather than the default behavior.

Here are some additional thoughts/considerations.

  1. Local files won’t be deleted until they’ve been uploaded to the
    remote storage, so this change is very safe—you don’t need to worry
    about files being cleaned up before they be upload to the remote.
  2. Considering the latency of remote storage, the local retention period
    won’t be set too short. For example, in our production environment, we
    keep 1 day of local data alongside 3-7 days in remote storage, so
    there’s still 1 day of redundancy.

Example for the goal: image

Reviewers: Kamal Chandraprakash kamal.chandraprakash@gmail.com

Signed-off-by: stroller <fujian1115@gmail.com>
@github-actions github-actions Bot added triage PRs from the community storage Pull requests that target the storage module tiered-storage Related to the Tiered Storage feature clients small Small PRs labels Nov 18, 2025
@jiafu1115
Copy link
Copy Markdown
Contributor Author

jiafu1115 commented Nov 18, 2025

Attach test result:
[Precondition]
Create one topic enable remote stroage in Kafka (3 brokers + 3 controller)

local storage time: 20 minutes
remote stroage time: 40 minutes
partition:  3
segement.bytes: 10M
image

[Steps]

  1. Deploy this code patch into one broker only and restart the broker
  2. Keep sending a lot of messages to the topic
  3. Check the disk sizes on both local and remote storage at two points in time: 20 minutes before and 1 hour after.

[Result]

Before 20 minutes:

  1. only 2 partition upload the local to remote.

After 1 hour:

  1. The remote storage size for one partition (on the broker with the code change) is much smaller than the other two.
image 2. The sizes of the local disks are similar. image

@jiafu1115 jiafu1115 marked this pull request as ready for review November 18, 2025 11:37
@jiafu1115 jiafu1115 changed the title KAFKA-19893: Reduce redundant storage in remote tier (configurable) KAFKA-19893: Reduce redundant storage in remote tier Nov 18, 2025
@jiafu1115 jiafu1115 changed the title KAFKA-19893: Reduce redundant storage in remote tier KAFKA-19893: Reduce redundant storage in remote tier (configurable) Nov 19, 2025
@jiafu1115 jiafu1115 changed the title KAFKA-19893: Reduce redundant storage in remote tier (configurable) KAFKA-19893: Reduce tiered storage redundancy with delayed upload Nov 19, 2025
@jiafu1115 jiafu1115 changed the title KAFKA-19893: Reduce tiered storage redundancy with delayed upload KAFKA-19893: Reduce tiered storage redundancy with delayed upload (KIP-1241) Nov 19, 2025
… for remote storage.

Signed-off-by: Jian <fujian1115@gmail.com>
Signed-off-by: Jian <fujian1115@gmail.com>
@github-actions
Copy link
Copy Markdown

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

@jiafu1115
Copy link
Copy Markdown
Contributor Author

Hi, @kamalcph
Sorry to bother you. I know you’ve been deeply involved in the remote storage area, and I was wondering if you might be interested — when you have some free time — in taking a look at this cost-saving topic and providing some guidance. Thank you very much!

@github-actions
Copy link
Copy Markdown

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

@jiafu1115
Copy link
Copy Markdown
Contributor Author

jiafu1115 commented Dec 2, 2025

image image

cc @kamalcph here due to community's email don't allow to attach the image. We can discuss the content in email about the KIP. Thanks

@kamalcph
Copy link
Copy Markdown
Contributor

kamalcph commented Dec 2, 2025

@jiafu1115

The already uploaded segments are eligible for deletion from broker. So, when remote storage is down, then those segments can be deleted as per the local retention settings and new segments can occupy those space. This provides more time for the Admin to act when remote storage is down for a longer time.

@jiafu1115
Copy link
Copy Markdown
Contributor Author

jiafu1115 commented Dec 2, 2025

@kamalcph I think I understand what you mean now. I’ve updated the picture above. Could you help double-check whether we’ve reached the same understanding?
The drawback of this KIP is that, during a long time remote storage outage. it will occupied more disk so that admin may need one extra disk expansion. The max value is the redundant part we saving.
Thus. After the outage recovered. It will come back to the beginning.
Right?

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Dec 4, 2025

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

@jiafu1115 jiafu1115 changed the title KAFKA-19893: Reduce tiered storage redundancy with delayed upload (KIP-1241) KAFKA-19893: Reduce tiered storage redundancy with delayed upload (Topic-level feature) (KIP-1241) Dec 4, 2025
@github-actions
Copy link
Copy Markdown

github-actions Bot commented Dec 6, 2025

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

@jiafu1115
Copy link
Copy Markdown
Contributor Author

jiafu1115 commented May 18, 2026

@kamalcph

Hi Kamal:
Sorry to trouble you. I refactored the code based on your suggestion.
0 is the default value only mean upload at once only. -1 is the max lag value as local retention time/size.

It seems the logic is a bit simpler now and the concepts are more clearer, and it’s not as complicated as I originally thought. so I decided to go with your approach. Could you help review it again?

I pasted the test matrix here for your reference to help check the logic.

image

Copy link
Copy Markdown
Contributor

@kamalcph kamalcph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jiafu1115 for addressing the review comments. We may have to further simplify the logic.

  1. The current logic has multiple negative conditions, which makes it harder to follow. The delayCopy method internally calls notExceededCopyLagTime and notExceededCopyLagSize. Both of these not... methods check whether the threshold is exceeded and then invert the result again. Can we update this logic to use a positive flow instead?

(eg)

List<EnrichedLogSegment> candidateLogSegments(UnifiedLog log, Long fromOffset, Long lastStableOffset) {
...
...
 if (isEligibleForUpload(log.config(), previousSeg, currentTimeMs, totalLogSize, cumulativeSize)) {
      candidateLogSegments.add(new EnrichedLogSegment(previousSeg, currentSeg.baseOffset()));
  } else {
      break;
  }

and update the delay logic to:

private boolean isEligibleForUpload(LogConfig logConfig,
                                LogSegment previousSeg,
                                long currentTimeMs,
                                long totalLogSize,
                                long cumulativeSize) {
  long effectiveCopyLagMs = logConfig.remoteCopyLagMs();
  // derive the value from local retention time when copyLagMs configured to -1
  if (effectiveCopyLagMs == -1L) {
      effectiveCopyLagMs = logConfig.localRetentionMs();
      // if the local retention time is configured to infinite, then configure copyLagMs as infinite
      if (effectiveCopyLagMs == -1L) {
          effectiveCopyLagMs = Long.MAX_VALUE;
      }
  }
  long effectiveCopyLagBytes = logConfig.remoteCopyLagBytes();
  // derive the value from local retention size when copyLagBytes configured to -1
  if (effectiveCopyLagBytes == -1L) {
      effectiveCopyLagBytes = logConfig.localRetentionBytes();
      // if the local retention size is configured to infinite, then configure copyLagBytes as infinite
      if (effectiveCopyLagBytes == -1L) {
          effectiveCopyLagBytes = Long.MAX_VALUE;
      }
  }
  
  try {
      long segmentAgeMs = currentTimeMs - previousSeg.largestTimestamp();
      // If the segment's largestTimestamp is higher than the current time, then allow the segment to upload.
      boolean shouldUploadNow = segmentAgeMs < 0;
      // Upload when either of remote-copy lag time/size breaches the threshold.
      if (!shouldUploadNow) {
          shouldUploadNow = segmentAgeMs >= effectiveCopyLagMs;
      }
      if (!shouldUploadNow) {
          long sizeLagBytes = totalLogSize - cumulativeSize;
          shouldUploadNow = sizeLagBytes >= effectiveCopyLagBytes;
      }
      return shouldUploadNow;
  } catch (IOException e) {
      // in case of any error, allow the segment to upload. Should not block the upload that might hinder the
      // deletion logic
      LOGGER.warn("Failed to get largest timestamp for segment {}, marking it as eligible for upload based on time", previousSeg, e);
      return true;
  }
}

I ran the newly added unit tests and it is passing.

  1. Also, move the unit tests from RemoteLogManagerTest to a new RemoteLagCopyTest since the RemoteLogManagerTest is huge with 4k+ lines.

jiafu1115 and others added 4 commits May 18, 2026 15:40
Signed-off-by: stroller.fu <stroller.fu@zoom.us>
Signed-off-by: stroller.fu <stroller.fu@zoom.us>
Signed-off-by: stroller.fu <stroller.fu@zoom.us>
Signed-off-by: stroller.fu <stroller.fu@zoom.us>
@jiafu1115 jiafu1115 requested a review from kamalcph May 18, 2026 14:10
jiafu1115 added 7 commits May 19, 2026 04:14
Signed-off-by: Jian <fujian1115@gmail.com>
Signed-off-by: Jian <fujian1115@gmail.com>
Signed-off-by: Jian <fujian1115@gmail.com>
Signed-off-by: Jian <fujian1115@gmail.com>
Signed-off-by: Jian <fujian1115@gmail.com>
Signed-off-by: stroller.fu <stroller.fu@zoom.us>
Signed-off-by: stroller.fu <stroller.fu@zoom.us>
jiafu1115 added 4 commits May 21, 2026 19:43
Signed-off-by: Jian <fujian1115@gmail.com>
Signed-off-by: Jian <fujian1115@gmail.com>
Signed-off-by: Jian <fujian1115@gmail.com>
@jiafu1115 jiafu1115 requested a review from kamalcph May 21, 2026 15:53
Copy link
Copy Markdown
Contributor

@kamalcph kamalcph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for addressing the review comments! Left few minor comments.

Comment thread core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
}

@Test
def testDynamicRemoteCopyLagThrowsOnIncorrectConfig(): Unit = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you also add one test for valid dynamic broker config change? Thanks!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, move the testDynamicRemoteCopyLagThrowsOnIncorrectConfig test to DynamicBrokerConfigTest.java instead of DynamicBrokerConfigTest.scala.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be taken up later, we already have one dynamic broker config change test in KafkaConfigTest.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. thanks @kamalcph
Thank you very much for your patient and thorough review.

@kamalcph kamalcph merged commit edcada2 into apache:trunk May 22, 2026
31 checks passed
Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jiafu1115 thanks for this patch. It is cooool

private boolean eligibleUploadByTime(LogSegment segment, long currentTimeMs, long copyLagMs) {
try {
long segmentAgeMs = currentTimeMs - segment.largestTimestamp();
boolean eligibleUpload = segmentAgeMs < 0 || segmentAgeMs >= copyLagMs;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a lingering issue. The local segment with a future timestamp is still NOT deleted, right? Should we allow the deletion of these local files once they have been successfully uploaded to remote storage, even if they contain future timestamps?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we allow the deletion of these local files once they have been successfully uploaded to remote storage, even if they contain future timestamps?

Currently, it gets deleted based on the local retention bytes limit since the local retention deletion logic follow the similar methods of full deletion logic.

https://sourcegraph.com/r/github.com/apache/kafka/-/blob/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java?L2008

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current behavior looks like a bug to me, as local segments shouldn't be blocked from deletion once they are already uploaded. Perhaps we could fall back to checking the lastModifiedTime when a segment contains future records. Alternatively, we could introduce a new configuration flag to provide alternative logic for handling future records.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't be blocked from deletion once they are already uploaded. Perhaps we could fall back to checking the lastModifiedTime when a segment contains future records.

Yeah, we can allow this behavior by introducing a config. The segment exist in the remote storage but the user might face slowness in reading the data from remote if they don't have prefetching feature implemented in the Remote Storage Manager. So, better to gate the change in behavior via config and the change applies only when remote storage is enabled on the topic.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we all hate -2, there is another approach: reverse the configuration logic.

For example, we could introduce remote.copy.before.deletion.ms instead of remote.copy.lag.ms. If a user sets remote.copy.before.deletion.ms=2hr, it means they want to upload log segments 2 hours before they are scheduled to be deleted from local storage.

The default value would be -1, which dynamically resolves to local.retention.ms. This fully maintains backward compatibility (resulting in immediate upload). Setting it to 0 means the user wants to delay the upload as much as possible, triggering it right before local deletion.

The only side effect is that this kind of "reverse" or "countdown" time calculation is quite rare in the existing Kafka codebase.

Copy link
Copy Markdown
Contributor

@kamalcph kamalcph May 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only side effect is that this kind of "reverse"

yeah, this is not intuitive when compared with the other configs like retention time/bytes. My suggestion is to:

  1. Improve the config documentation about configuring both the values and
  2. Add validation to throw an error when remote-copy-lag-time = 0 and remote-copy-lag-bytes != 0 and vice-versa.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that I replied to the wrong thread earlier, sorry about that 😢

How about this solution? if we found it contain future record. we use LogSegment#lastModified to compare the time?

Yes, that is an acceptable approach. I've opened https://issues.apache.org/jira/browse/KAFKA-20609 so we can keep discussing there.

yeah, this is not intuitive when compared with the other configs like retention time/bytes. My suggestion is to:

While documentation is the final line of defense for users, it's better to have an intuitive design out of the box. After all, me proposed reverse configuration was rejected precisely because it wasn't intuitive enough

Another way is to align the logic with retention.ms and retention.bytes. Since most users care more about time than size, we could set the default value of the time lag to 0, and the size lag to -1. This way, most users can just adjust the time setting without having to touch the size configuration. WDYT?

Copy link
Copy Markdown
Contributor

@kamalcph kamalcph May 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Thanks for filing KAFKA-20609 ticket. Fixing this is out of scope of the KIP-1241 as the bug exist before too.

Since most users care more about time than size, we could set the default value of the time lag to 0, and the size lag to -1.

I like the idea to provide out-of-box default values that works in majority of use-cases and retain the eager upload logic. The new default values for remote-copy lag align with the log.retention.hours = 7 days and log.retention.bytes = infinite (-1). It lgtm.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kamalcph @chia7712 ack
Let me take time to think and handle it. Thanks

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #22394 to address this comment. PTAL.

}
}

private static void validateRemoteCopyLagTime(Map<?, ?> props) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Map<?, ?> ->Map<String, ?>

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicated with another one. propose the fix in #22363 . thanks.

}
}

private static void validateRemoteCopyLagSize(Map<?, ?> props) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Map<?, ?> ->Map<String, ?>

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 thanks for review. Fix it on #22363

previousSeg, copyLagMs, copyLagBytes, currentTimeMs, totalLogSize, cumulativeSize, totalLogSize - cumulativeSize);
}

if (copyLagMs == 0 || copyLagBytes == 0) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we log a warning or info message when a user configures one lag property but leaves the other at its default value of 0?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 Thanks for your view.
How about describing this case in the documentation? If we only log it as a warning, users may ignore it. WDYT?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. We can expand upgrade.md as well.

chia7712 pushed a commit that referenced this pull request May 23, 2026
This is a follow-up PR for
#20913 (comment)
Thanks.  cc @kamalcph

Reviewers: Murali Basani <muralidhar.basani@aiven.io>, Ken Huang
 <s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved clients core Kafka Broker storage Pull requests that target the storage module tiered-storage Related to the Tiered Storage feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants